#include "iot_to_aliyun.h"

#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#include "business_def.h"

//c++ version
#include "HMAC_SHA1.h"
#include "Base64.h"
//c version
#include "HMACSHA1.h"

#include <time.h>
#include <sys/timeb.h>

#include "producer_aliyun.h"
#include "consumer_aliyun.h"
#include "strchange.h"
#include "pfunc.h"
#include "Log.h"

#include "aliyun_iot_func.h"

#ifdef WIN32
#define usleep(x) Sleep(x)
#endif

namespace G_VALUE {
	extern int IOT_CODE;
};

void ALiyunEventOut::toVec()
{
	outs.clear();
	pyfree::string_divide(outs, aliEventOut, ";");
};

IOToAliyun::IOToAliyun(void)
	: running(true)
	, initLink(false)
	, pclient(NULL)
	, msg_buf(NULL)
	, msg_readbuf(NULL)
	, to_aliyun_event_queue(QueueDataSingle<JsonEvent>::getInstance())
	, cache_to_aliyun_queue(QueueDataSingle<SocketAliyunWriteItem>::getInstance())
	, cnt(1)
	, producer(NULL)
	, consumer(NULL)
{
	init();
}

IOToAliyun::~IOToAliyun(void)
{
	running = false;
	uninit();
}

void IOToAliyun::init()
{
	BusinessDef *ptr_CacheDataObj = BusinessDef::getInstance();
	to_aliyun_event_queue->setQueueDesc("queue_event_aliyun");
	pyfree::AliyunTriples gateway = ptr_CacheDataObj->getGateWay();
	gatewayTriples.product_key		= gateway.product_key;
	gatewayTriples.product_secret	= gateway.product_secret;
	gatewayTriples.device_name		= gateway.device_name;
	gatewayTriples.device_secret	= gateway.device_secret;
	pyfree::ALiyunEventInfo aliEI = ptr_CacheDataObj->getEventInfo();
	gatewayEI.aliEventFlag			= aliEI.aliEventFlag;
	gatewayEI.aliEventOut			= aliEI.aliEventOut;
	gatewayEI.toVec();
	ptr_CacheDataObj->getDevTriples(deviceTriples);
	{
		AliyunDeviceMaps topicMapDesc;
		topicMapDesc.id = ptr_CacheDataObj->getAreaInfo().areaId;
		topicMapDesc.subcribe_flag = false;
		char buf[256] = { 0 };
		//std::string alink_topic_dev_login_reply = "/ext/session/"
		//	+ gatewayTriples.product_key + "/" + gatewayTriples.device_name + "/combine/login_reply";
		//
		sprintf(buf, ALINK_TOPIC_DEV_LOGIN_REPLY
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_dev_login_reply = std::string(buf,strlen(buf));
		topicMapDesc.topicType = TOPIC_DEV_LOGIN_REPLY;
		subTopicMaps[alink_topic_dev_login_reply] = topicMapDesc;
		//
		memset(buf, 0, 256);
		sprintf(buf, ALINK_TOPIC_DEV_LOGOUT_REPLY
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_dev_logout_reply = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_DEV_LOGOUT_REPLY;
		subTopicMaps[alink_topic_dev_logout_reply] = topicMapDesc;
		//网关设备事件反馈
		memset(buf, 0, 256);
		sprintf(buf, ALINK_TOPIC_EVENT_UP_REPLY
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str()
			, gatewayEI.aliEventFlag.c_str());
		std::string alink_topic_event_up_reply = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_EVENT_CLIENT_UP_REPLY;
		subTopicMaps[alink_topic_event_up_reply] = topicMapDesc;
	}
	for (std::map<unsigned long long, AliyunDevices>::iterator it = deviceTriples.begin();
		it != deviceTriples.end(); ++it)
	{
		AliyunDeviceMaps topicMapDesc;
		topicMapDesc.id = it->first;
		topicMapDesc.subcribe_flag = false;
		for (std::map<unsigned int, std::string>::iterator itp = it->second.aliyun_keys.begin();
			itp!=it->second.aliyun_keys.end(); itp++)
		{
			topicMapDesc.aliyun_keys[itp->second] = itp->first;
		}
		char buf[256] = { 0 };
		
		//std::string alink_child_topic_prop_post_reply = "/sys/"
		//	+ it->second.atriples.product_key + "/"
		//	+ it->second.atriples.device_name + "/thing/event/property/post_reply";
		//属性上送反馈
		sprintf(buf, ALINK_TOPIC_EVENT_PRO_POST_REPLY
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string alink_child_topic_prop_post_reply = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_EVENT_PRO_POST_REPLY;
		subTopicMaps[alink_child_topic_prop_post_reply] = topicMapDesc;

		//std::string alink_child_topic_prop_set = "/sys/"
		//	+ it->second.atriples.product_key + "/"
		//	+ it->second.atriples.device_name + "/thing/service/property/set";
		//服务设置属性
		memset(buf, 0, 256);
		sprintf(buf, ALINK_TOPIC_SERVICE_PRO_SET
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string alink_child_topic_prop_set = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_SERVICE_PRO_SET;
		subTopicMaps[alink_child_topic_prop_set] = topicMapDesc;
		//dev_tag and dev_desired isn't refer to point info
		topicMapDesc.aliyun_keys.clear();
		//标签上送和删除反馈
		memset(buf, 0, 256);
		sprintf(buf, ALINK_TOPIC_TAG_UPDATE_REPLY
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string alink_child_topic_tag_up_reply = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_DEV_TAG_UPDATE_REPLY;
		subTopicMaps[alink_child_topic_tag_up_reply] = topicMapDesc;
		memset(buf, 0, 256);
		sprintf(buf, ALINK_TOPIC_TAG_DELETE_REPLY
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string alink_child_topic_tag_del_reply = std::string(buf, strlen(buf));
		topicMapDesc.topicType = TOPIC_DEV_TAG_DEL_REPLY;
		subTopicMaps[alink_child_topic_tag_del_reply] = topicMapDesc;
		////各个设备事件反馈
		//memset(buf, 0, 128);
		//sprintf(buf, ALINK_TOPIC_EVENT_UP_REPLY
		//	, it->second.atriples.product_key.c_str()
		//	, it->second.atriples.device_name.c_str()
		//	, it->second.aliEI.aliEventFlag.c_str());
		//std::string alink_child_topic_event_up_reply = std::string(buf, strlen(buf));
		//topicMapDesc.topicType = TOPIC_EVENT_CLIENT_UP_REPLY;
		//subTopicMaps[alink_child_topic_event_up_reply] = topicMapDesc;
	}
	consumer = new ConsumerAliyun(subTopicMaps);
	consumer->start();
	if (NULL == (msg_buf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: not enough memory"
			, __func__
			, __LINE__);
	}

	if (NULL == (msg_readbuf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: not enough memory"
			, __func__
			, __LINE__);
	}
	IOT_OpenLog("mqtt");
	IOT_SetLogLevel(IOT_LOG_DEBUG);

	HAL_SetProductKey((char*)gatewayTriples.product_key.c_str());
	HAL_SetProductSecret((char*)gatewayTriples.product_secret.c_str());
	HAL_SetDeviceName((char*)gatewayTriples.device_name.c_str());
	HAL_SetDeviceSecret((char*)gatewayTriples.device_secret.c_str());
};

void IOToAliyun::uninit()
{
	if (NULL != msg_buf) {
		HAL_Free(msg_buf);
	}

	if (NULL != msg_readbuf) {
		HAL_Free(msg_readbuf);
	}
	IOT_DumpMemoryStats(IOT_LOG_DEBUG);
	IOT_CloseLog();
	if (NULL != consumer)
	{
		delete consumer;
		consumer = NULL;
	}
};

void IOToAliyun::create()
{
	iotx_conn_info_pt pconn_info;
	iotx_mqtt_param_t mqtt_params;
	
	/* Device AUTH */
	if (0 != IOT_SetupConnInfo(gatewayTriples.product_key.c_str()
		, gatewayTriples.device_name.c_str()
		, gatewayTriples.device_secret.c_str()
		, (void **)&pconn_info)) 
	{
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: AUTH request failed!"
			, __func__, __LINE__);
		HAL_SleepMs(10000);
		return;
	}
	/* Initialize MQTT parameter */
	memset(&mqtt_params, 0x0, sizeof(mqtt_params));

	mqtt_params.port = pconn_info->port;
	mqtt_params.host = pconn_info->host_name;
	mqtt_params.client_id = pconn_info->client_id;
	mqtt_params.username = pconn_info->username;
	mqtt_params.password = pconn_info->password;
	mqtt_params.pub_key = pconn_info->pub_key;

	mqtt_params.request_timeout_ms = 2000;
	mqtt_params.clean_session = 0;
	mqtt_params.keepalive_interval_ms = 300000;
	//
	mqtt_params.pread_buf = msg_readbuf;
	mqtt_params.read_buf_size = MQTT_MSGLEN;
	mqtt_params.pwrite_buf = msg_buf;
	mqtt_params.write_buf_size = MQTT_MSGLEN;

	mqtt_params.handle_event.h_fp = event_handle;
	mqtt_params.handle_event.pcontext = NULL;

	/* Construct a MQTT client with specify parameter */
	pclient = IOT_MQTT_Construct(&mqtt_params);
	if (NULL == pclient) {
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: MQTT construct failed"
			, __func__
			, __LINE__);
		HAL_SleepMs(10000);
	}
	else
	{
		//2019-11-18 add,输出构造成功日志,方便问题排查
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: MQTT construct success!"
			, __func__
			, __LINE__);
	}
}

void IOToAliyun::destroy()
{
	if (NULL == pclient)
		return;
	IOT_MQTT_Destroy(&pclient);
}
//订购信息太多,无法分配句柄,暂停事件的独立返回处置,由标准回调函数处理
void IOToAliyun::subscribe(bool gatewayFlag)
{
	int rc = 0;
	for (std::map<std::string, AliyunDeviceMaps>::iterator it = subTopicMaps.begin();
		it!= subTopicMaps.end(); ++it)
	{
		if (it->second.subcribe_flag) {
			//同一个主题,只要订阅过一次就行了，程序重启也不影响的，不需要再次订阅。
			continue;
		}
		if (gatewayFlag) 
		{
			switch (it->second.topicType)
			{
			//注释
			//case TOPIC_DEV_LOGIN_REPLY:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, login_message_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			//case TOPIC_DEV_LOGOUT_REPLY:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, logout_message_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			case TOPIC_EVENT_CLIENT_UP_REPLY:
				rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, event_up_reply_arrive, NULL);
				it->second.subcribe_flag = true;
				break;
			default:
				continue;
			}
		}
		else {
			switch (it->second.topicType)
			{
			//过多指定订购阿里云物联网平台支持要限制,不特定指定回调的采用通用回调函数(event_handle)处理,
			//然后再在通用回调函数中去调用特定回调函数处理
			case TOPIC_EVENT_PRO_POST_REPLY:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, push_reply_message_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			case TOPIC_SERVICE_PRO_SET:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, service_set_message_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			case TOPIC_DEV_TAG_UPDATE_REPLY:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, tag_update_reply_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			case TOPIC_DEV_TAG_DEL_REPLY:
			//	rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, tag_del_reply_arrive, NULL);
			//  it->second.subcribe_flag = true;
			//	break;
			default:
				continue;
			}
		}
		IOT_MQTT_Yield(pclient, 200);
		if (rc < 0) {
			CLogger::createInstance()->Log(MsgInfo
				, "%s|%03d :: IOT_MQTT_Subscribe() failed, rc = %d"
				, __func__
				, __LINE__
				, it->first.c_str()
				, rc);
		}
		else {
			Print_NOTICE("IOT_MQTT_Subscribe(%s) success!\n", it->first.c_str());
		}
		HAL_SleepMs(100);
	}
}

void IOToAliyun::unsubscribe()
{
	for (std::map<std::string, AliyunDeviceMaps>::iterator it = subTopicMaps.begin();
		it!= subTopicMaps.end();++it)
	{
		if (it->second.subcribe_flag)
		{
			IOT_MQTT_Unsubscribe(pclient, it->first.c_str());
			it->second.subcribe_flag = false;
			IOT_MQTT_Yield(pclient, 200);
		}
	}
}

void IOToAliyun::devOnLine()
{
	for (std::map<unsigned long long, AliyunDevices>::iterator it = deviceTriples.begin();
		it != deviceTriples.end(); ++it)
	{
		int msg_len = 0;
		char msg_pub[256] = { 0 };
		//生成签名
		char hmac_buf[256] = { 0 };
		char temp_comment[256] = { 0 };
		struct timeb tm0;
		ftime(&tm0);
		long long timestamp = tm0.time * 1000 + tm0.millitm;
		//"clientId%s&%sdeviceName%sproductKey%stimestamp%lld"
		sprintf(temp_comment, ALINK_COMMENT_FORMAT
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str()
			, it->second.atriples.device_name.c_str()
			, it->second.atriples.product_key.c_str()
			, timestamp);
		#ifdef WIN32
		hmac_sha((char*)it->second.atriples.device_secret.c_str()
			, (int)(it->second.atriples.device_secret.length())
			, (char*)temp_comment
			, (int)(strlen(temp_comment))
			, hmac_buf
			, 20);
		#else
		CHMAC_SHA1 sha1;
		sha1.HMAC_SHA1((unsigned char*)temp_comment
			, static_cast<int>(strlen(temp_comment))
			, (unsigned char*)it->second.atriples.device_secret.c_str()
			, static_cast<int>(it->second.atriples.device_secret.length())
			, (unsigned char*)hmac_buf);
		#endif
		Print_NOTICE("DEVICE_SECRET:%s,temp_comment:%s,hmac_buf:%s\n"
			, it->second.atriples.device_secret.c_str()
			, temp_comment, hmac_buf);
		std::string hmac_str_hex = "";
		for (size_t i = 0; i < strlen(hmac_buf); i++)
		{
			char buf[32] = { 0 };
			sprintf(buf, "%02hhx ", hmac_buf[i]);
			hmac_str_hex += std::string(buf, 2);
		}
		Print_NOTICE("%s\n", hmac_str_hex.c_str());
		memset(msg_pub, 0x0, 256);
		//
		msg_len = sprintf(msg_pub, login_format
			, cnt++
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str()
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str()
			, timestamp, hmac_str_hex.c_str());

		Print_NOTICE("msg_pub(%d)=%s\n", msg_len, msg_pub);
		iotx_mqtt_topic_info_t topic_msg;
		memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
		topic_msg.qos = IOTX_MQTT_QOS0;
		topic_msg.retain = 0;
		topic_msg.dup = 0;
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;

		int rc = 0;
		//std::string alink_topic_dev_login_ = "/ext/session/"
		//	+ gatewayTriples .product_key+"/"
		//	+ gatewayTriples .device_name+"/combine/login";
		char buf[128] = { 0 };
		sprintf(buf, ALINK_TOPIC_DEV_LOGIN
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_dev_login_ = std::string(buf, strlen(buf));
		rc = IOT_MQTT_Publish(pclient, alink_topic_dev_login_.c_str(), &topic_msg);
		IOT_MQTT_Yield(pclient, 200);
		if (rc < 0)
		{
			//printf("error publish\n");
			CLogger::createInstance()->Log(MsgInfo
				, "%s|%03d :: \n publish message fail: \n topic: %s\n payload(%d): %s\n rc = %d"
				, __func__
				, __LINE__
				, alink_topic_dev_login_.c_str()
				, topic_msg.payload_len
				, topic_msg.payload
				, rc);
			//
		}
		else {
			EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s"
				, (uint32_t)rc, msg_pub);
		}
	}
}

void IOToAliyun::devOffLine()
{
	for (std::map<unsigned long long, AliyunDevices>::iterator it = deviceTriples.begin();
		it != deviceTriples.end(); ++it)
	{
		int msg_len = 0;
		char msg_pub[256] = { 0 };
		memset(msg_pub, 0x0, 256);
		//
		msg_len = sprintf(msg_pub, logout_format
			, cnt++
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());

		Print_NOTICE("msg_pub(%d)=%s\n", msg_len, msg_pub);
		iotx_mqtt_topic_info_t topic_msg;
		memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
		topic_msg.qos = IOTX_MQTT_QOS0;
		topic_msg.retain = 0;
		topic_msg.dup = 0;
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;

		int rc = 0;
		//std::string alink_topic_dev_login_ = "/ext/session/"
		//	+ gatewayTriples .product_key+"/"
		//	+ gatewayTriples .device_name+"/combine/login";
		char buf[128] = { 0 };
		sprintf(buf, ALINK_TOPIC_DEV_LOGOUT
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_dev_logout_ = std::string(buf, strlen(buf));
		rc = IOT_MQTT_Publish(pclient, alink_topic_dev_logout_.c_str(), &topic_msg);
		IOT_MQTT_Yield(pclient, 200);
		if (rc < 0)
		{
			//printf("error publish\n");
			CLogger::createInstance()->Log(MsgInfo
				, "%s|%03d :: \n publish message fail: \n topic: %s\n payload(%d): %s\n rc = %d"
				, __func__
				, __LINE__
				, alink_topic_dev_logout_.c_str()
				, topic_msg.payload_len
				, topic_msg.payload
				, rc);
			//
		}
		else {
			EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s"
				, (uint32_t)rc, msg_pub);
		}
	}
};

void IOToAliyun::devTagUp()
{
	for (std::map<unsigned long long, AliyunDevices>::iterator itdev = deviceTriples.begin();
		itdev != deviceTriples.end(); ++itdev)
	{
		if (itdev->second.dev_tags.empty())
		{
			continue;
		}
		std::string params = "";
		std::map<std::string, std::string>::iterator itTagLast = itdev->second.dev_tags.end();
		itTagLast--;
		for (std::map<std::string, std::string>::iterator itTag = itdev->second.dev_tags.begin();
			itTag!= itdev->second.dev_tags.end();++itTag)
		{
			char buf[128] = { 0 };
			sprintf(buf,"{\"attrKey\":\"%s\",\"attrValue\":\"%s\"}"
				, itTag->first.c_str()
				,itTag->second.c_str());
			if (strlen(buf) > 0) 
			{
				params += std::string(buf, strlen(buf));
				if (itTag != itTagLast)
				{
					params += ",";
				}
			}
		}
		if (params.empty())
			continue;
		//
		iotx_mqtt_topic_info_t topic_msg;
		memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
		int msg_len = 0;
		char msg_pub[MQTT_MSGLEN] = { 0 };
		topic_msg.qos = IOTX_MQTT_QOS0;
		topic_msg.retain = 0;
		topic_msg.dup = 0;
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		memset(msg_pub, 0x0, MQTT_MSGLEN);
		msg_len = sprintf(msg_pub, PAYLOAD_FORMAT_TAG, cnt++, ALINK_METHOD_TAG_UPDATE, params.c_str());

		if (msg_len < 0) {
			EXAMPLE_TRACE("Error occur! Exit program");
			continue;
		}

		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		std::string device_topic_tag_up = getAliyunTagUpTopic(itdev->first);
		if (!device_topic_tag_up.empty()) {
			int rc = IOT_MQTT_Publish(pclient, device_topic_tag_up.c_str(), &topic_msg);
			IOT_MQTT_Yield(pclient, 200);
			if (rc < 0) {
				//EXAMPLE_TRACE("error occur when publish");
				CLogger::createInstance()->Log(MsgInfo
					, "%s|%03d :: \n publish message fail: \n topic: %s \n payload(%d): %s \n rc = %d"
					, __func__
					, __LINE__
					, device_topic_tag_up.c_str()
					, topic_msg.payload_len
					, topic_msg.payload
					, rc);
			}
			else {
				EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s"
					, (uint32_t)rc, msg_pub);
			}
		}
	}
};

void IOToAliyun::devTagDel()
{
	for (std::map<unsigned long long, AliyunDevices>::iterator itdev = deviceTriples.begin();
		itdev != deviceTriples.end(); ++itdev)
	{
		if (itdev->second.dev_tags.empty())
		{
			continue;
		}
		std::string params = "";
		std::map<std::string, std::string>::iterator itTagLast = itdev->second.dev_tags.end();
		itTagLast--;
		for (std::map<std::string, std::string>::iterator itTag = itdev->second.dev_tags.begin();
			itTag != itdev->second.dev_tags.end(); ++itTag)
		{
			char buf[128] = { 0 };
			sprintf(buf, "{\"attrKey\":\"%s\"}"
				, itTag->first.c_str());
			if (strlen(buf) > 0)
			{
				params += std::string(buf, strlen(buf));
				if (itTag != itTagLast)
				{
					params += ",";
				}
			}
		}
		if (params.empty())
			continue;
		//
		iotx_mqtt_topic_info_t topic_msg;
		memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
		int msg_len = 0;
		char msg_pub[MQTT_MSGLEN] = { 0 };
		topic_msg.qos = IOTX_MQTT_QOS0;
		topic_msg.retain = 0;
		topic_msg.dup = 0;
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		memset(msg_pub, 0x0, MQTT_MSGLEN);
		msg_len = sprintf(msg_pub, PAYLOAD_FORMAT_TAG, cnt++, ALINK_METHOD_TAG_DELETE, params.c_str());

		if (msg_len < 0) {
			EXAMPLE_TRACE("Error occur! Exit program");
			continue;
		}

		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		std::string device_topic_tag_del = getAliyunTagDelTopic(itdev->first);
		if (!device_topic_tag_del.empty()) {
			int rc = IOT_MQTT_Publish(pclient, device_topic_tag_del.c_str(), &topic_msg);
			IOT_MQTT_Yield(pclient, 200);
			if (rc < 0) {
				//EXAMPLE_TRACE("error occur when publish");
				CLogger::createInstance()->Log(MsgInfo
					, "%s|%03d :: \n publish message fail: \n topic: %s \n payload(%d): %s \n rc = %d"
					, __func__
					, __LINE__
					, device_topic_tag_del.c_str()
					, topic_msg.payload_len
					, topic_msg.payload
					, rc);
			}
			else {
				EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s"
					, (uint32_t)rc, msg_pub);
			}
		}
	}
};

bool IOToAliyun::getCacheInfos(std::map<unsigned long long, std::queue<JsonPValue> > &its, int sizel)
{
	int rsize = cache_to_aliyun_queue->size();
	if (rsize <= 0)
	{
		return false;
	}
	rsize = rsize<sizel ? rsize : sizel;
	SocketAliyunWriteItem it;
	for(int i=0;i<rsize;++i){
		if(cache_to_aliyun_queue->pop(it))
		{
			std::map<unsigned long long, std::queue<JsonPValue> >::iterator itpos = its.find(it.devID);
			if (itpos != its.end()) 
			{
				JsonPValue jval;
				jval.pID = it.pID;
				jval.ptype = it.ptype;
				jval.sec = it.evtTimeS;
				jval.msec = it.evtTimeMS;
				jval.val = it.val;
				//jval.taskID = it.taskID;
				its[it.devID].push(jval);
			}
			else {
				std::queue<JsonPValue> pids;
				JsonPValue jval;
				jval.pID = it.pID;
				jval.ptype = it.ptype;
				jval.sec = it.evtTimeS;
				jval.msec = it.evtTimeMS;
				jval.val = it.val;
				//jval.taskID = it.taskID;
				pids.push(jval);
				its[it.devID] = pids;
			}
		}else{
			break;
		}
	}
	return !its.empty();
}
//属性发布
void IOToAliyun::sendProperty()
{
	std::map<unsigned long long, std::queue<JsonPValue> > its;
	if (this->getCacheInfos(its, 10))
	{
		for (std::map<unsigned long long, std::queue<JsonPValue> >::iterator itdev = its.begin();
			itdev != its.end(); ++itdev)
		{
			//生成json格式的发布消息
			std::string params = "{";
			while (!itdev->second.empty())
			{
				JsonPValue jval = itdev->second.front();
				std::string key_ = getAliyunKey(itdev->first, jval.pID);
				if (!key_.empty())
				{
					char buf[256] = { 0 };
					switch (jval.ptype)
					{
					case pyfree::OnYX:case pyfree::OnYXS:
						sprintf(buf, "\"%s\":%d", key_.c_str(), jval.val>0?1:0);
						break;
					case pyfree::OnYC:case pyfree::OnYCS:
						sprintf(buf, "\"%s\":%d", key_.c_str(), static_cast<int>(jval.val));
						break;
					default:
						break;
					}
					if (strlen(buf) > 0) {
						if ("{" != params)
							params += ",";
						params += std::string(buf, strlen(buf));
					}
				}
				itdev->second.pop();
			}
			if ("{" == params)
			{
				continue;
			}
			params += "}";
			//构建阿里云物联网平台的消息
			iotx_mqtt_topic_info_t topic_msg;
			memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
			int msg_len = 0;
			char msg_pub[MQTT_MSGLEN] = { 0 };
			topic_msg.qos = IOTX_MQTT_QOS0;
			topic_msg.retain = 0;
			topic_msg.dup = 0;
			topic_msg.payload = (char *)msg_pub;
			topic_msg.payload_len = msg_len;
			memset(msg_pub, 0x0, MQTT_MSGLEN);
			msg_len = sprintf(msg_pub, PAYLOAD_FORMAT, cnt++, ALINK_METHOD_PROP_POST, params.c_str());
			if (msg_len < 0) {
				EXAMPLE_TRACE("Error occur! Exit program");
				continue;
			}
			topic_msg.payload = (char *)msg_pub;
			topic_msg.payload_len = msg_len;
			std::string device_topic_post = getAliyunPostTopic(itdev->first);//获取发布主题
			if (!device_topic_post.empty()) {
				int rc = IOT_MQTT_Publish(pclient, device_topic_post.c_str(), &topic_msg);	//消息发布
				IOT_MQTT_Yield(pclient, 200);
				if (rc < 0) {
					//EXAMPLE_TRACE("error occur when publish");
					CLogger::createInstance()->Log(MsgInfo
						, "%s|%03d :: \n publish message fail: \n topic: %s \n payload(%d): %s \n rc = %d"
						, __func__
						, __LINE__
						, device_topic_post.c_str()
						, topic_msg.payload_len
						, topic_msg.payload
						, rc);
				}
				else {
					EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s", (uint32_t)rc, msg_pub);
				}
			}
			HAL_SleepMs(100);
		}
	}
}
//事件发布
void IOToAliyun::sendEvent()
{
	JsonEvent item;
	if (to_aliyun_event_queue->getFirst(item)) {
		//your code
		//ASCII2UTF_8
		if (!gatewayEI.isValid() || gatewayEI.outs.empty())
		{
			//
			to_aliyun_event_queue->removeFirst();
			return;
		}
		//
		unsigned long long ut = 1000*(static_cast<unsigned long long>(time(NULL)));
		std::string comment = "";
		for (unsigned int i=0;i<gatewayEI.outs.size();i++)
		{
			char buf[64] = { 0 };
			if ("etime"== gatewayEI.outs.at(i)) {
				sprintf(buf, "\"etime\":\"%s\"", item.etime.c_str());
			}
			else if ("etask" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"etask\":\"%s\"", item.etask.c_str());
			}
			else if ("etype" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"etype\":\"%s\"", item.etype.c_str());
			}
			else if ("elevel" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"elevel\":\"%s\"", item.elevel.c_str());
			}
			else if ("earea" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"earea\":\"%s\"", item.earea.c_str());
			}
			else if ("edev" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"edev\":\"%s\"", item.edev.c_str());
			}
			else if ("epid" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"epid\":\"%s\"", item.epid.c_str());
			}
			else if ("epval" == gatewayEI.outs.at(i)) {
				sprintf(buf, "\"epval\":\"%s\"", item.epval.c_str());
			}
			else {
				continue;
			}
			if (!comment.empty())
			{
				comment += ",";
			}
			comment += std::string(buf);
		}
		char buf[512] = { 0 };
		sprintf(buf,"{\"value\":{%s},\"time\":%llu}"
			, comment.c_str()
			, ut);
		//
		std::string buf_str = std::string(buf);
		// buf_str = ASCII2UTF_8(buf_str);
		//
		iotx_mqtt_topic_info_t topic_msg;
		memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
		int msg_len = 0;
		char msg_pub[MQTT_MSGLEN] = { 0 };
		topic_msg.qos = IOTX_MQTT_QOS0;
		topic_msg.retain = 0;
		topic_msg.dup = 0;
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		memset(msg_pub, 0x0, MQTT_MSGLEN);
		char buf_method[128] = { 0 };
		sprintf(buf_method,ALINK_METHOD_EVENT_UP, gatewayEI.aliEventFlag.c_str());
		msg_len = sprintf(msg_pub, PAYLOAD_FORMAT, cnt++, buf_method, buf_str.c_str());

		if (msg_len < 0) {
			EXAMPLE_TRACE("Error occur! Exit program");
			to_aliyun_event_queue->removeFirst();
			return;
		}
		topic_msg.payload = (char *)msg_pub;
		topic_msg.payload_len = msg_len;
		std::string device_topic_event = getAliyunEventUpTopic();
		if (!device_topic_event.empty()) {
			int rc = IOT_MQTT_Publish(pclient, device_topic_event.c_str(), &topic_msg);
			IOT_MQTT_Yield(pclient, 200);
			if (rc < 0) {
				//EXAMPLE_TRACE("error occur when publish");
				CLogger::createInstance()->Log(MsgInfo
					, "%s|%03d :: \n publish message fail: \n topic: %s \n payload(%d): %s \n rc = %d"
					, __func__
					, __LINE__
					, device_topic_event.c_str()
					, topic_msg.payload_len
					, topic_msg.payload
					, rc);
			}
			else {
				EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s", (uint32_t)rc, msg_pub);
			}
		}

		//
		to_aliyun_event_queue->removeFirst();
	}
}

std::string IOToAliyun::getAliyunPostTopic(unsigned long long devId)
{
	std::map<unsigned long long, pyfree::AliyunDevices>::iterator it = deviceTriples.find(devId);
	if (it != deviceTriples.end())
	{
		//std::string topic_ = "/sys/"+it->second.atriples.product_key+"/"
		//	+it->second.atriples.device_name+"/thing/event/property/post";
		char buf[256] = { 0 };
		sprintf(buf, ALINK_TOPIC_EVENT_PRO_POST
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string topic_ = std::string(buf, strlen(buf));
		return topic_;
	}
	return "";
}

std::string IOToAliyun::getAliyunEventUpTopic()
{
	char buf[256] = { 0 };
	sprintf(buf, ALINK_TOPIC_EVENT_UP
		, gatewayTriples.product_key.c_str()
		, gatewayTriples.device_name.c_str()
		, gatewayEI.aliEventFlag.c_str());
	std::string topic_ = std::string(buf, strlen(buf));
	return topic_;
}

std::string IOToAliyun::getAliyunTagUpTopic(unsigned long long devId)
{
	std::map<unsigned long long, pyfree::AliyunDevices>::iterator it = deviceTriples.find(devId);
	if (it != deviceTriples.end())
	{
		char buf[256] = { 0 };
		sprintf(buf, ALINK_TOPIC_TAG_UPDATE
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string topic_ = std::string(buf, strlen(buf));
		return topic_;
	}
	return "";
}

std::string IOToAliyun::getAliyunTagDelTopic(unsigned long long devId)
{
	std::map<unsigned long long, pyfree::AliyunDevices>::iterator it = deviceTriples.find(devId);
	if (it != deviceTriples.end())
	{
		char buf[256] = { 0 };
		sprintf(buf, ALINK_TOPIC_TAG_DELETE
			, it->second.atriples.product_key.c_str()
			, it->second.atriples.device_name.c_str());
		std::string topic_ = std::string(buf, strlen(buf));
		return topic_;
	}
	return "";
}

std::string IOToAliyun::getAliyunKey(unsigned long long devId, unsigned int pId)
{
	std::map<unsigned long long, pyfree::AliyunDevices>::iterator it =  deviceTriples.find(devId);
	if (it != deviceTriples.end())
	{
		std::map<unsigned int, std::string>::iterator itp =  it->second.aliyun_keys.find(pId);
		if (itp != it->second.aliyun_keys.end())
		{
			return itp->second;
		}
	}
	return "";
}

void* IOToAliyun::run()
{
	int linkStateFailCount = 0;//鉴于4G路由联网问题,MQTT构造完成去无法联网,如果多次判定通信不成功,重新构造链接信息
	while (running)
	{
		if (NULL == pclient)
		{
			create();//create pclient;
		} else {
			if (IOT_MQTT_CheckStateNormal(pclient))
			{
				if (!initLink)
				{
					////devTagDel();		//删除设备标签
					//devOffLine();
					//unsubscribe();
					////
					CLogger::createInstance()->Log(MsgInfo
						, "%s|%03d :: init Link and subscribe do it."
						, __func__
						, __LINE__);
					subscribe(true);
					devOnLine();
					subscribe(false);//子设备上线后才订购其相关信息
					devTagUp();		//子设备tag上送
					initLink = true;
				}
				else {
					if (520 == G_VALUE::IOT_CODE)
					{
						initLink = false;
						G_VALUE::IOT_CODE = 0;
					}
				}
				sendProperty();
				sendEvent();
				/* handle the MQTT packet received from TCP or SSL connection */
				IOT_MQTT_Yield(pclient, 200);
			}
			else {
				if (initLink)
				{
					initLink = false;
				}
				/* handle the MQTT packet received from TCP or SSL connection */
				IOT_MQTT_Yield(pclient, 1000);
				linkStateFailCount++;
				if (linkStateFailCount>100)//判定通信不正常超过100次,重新构造链接资源解决
				{
					linkStateFailCount = 0;
					this->destroy();
				}
			}
		}
		HAL_SleepMs(100);
	}
	if (IOT_MQTT_CheckStateNormal(pclient))
	{
		//devTagDel();		//删除设备标签
		devOffLine();
		unsubscribe();
	}
	destroy();
	return 0;
}
